Spark流处理中定时更新广播变量值

您所在的位置:网站首页 spark streaming 广播变量 Spark流处理中定时更新广播变量值

Spark流处理中定时更新广播变量值

2024-07-12 12:46| 来源: 网络整理| 查看: 265

Spark流处理中定时更新广播变量值

在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。

一、Spark streaming

可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:

注意事项: spark streaming会为每一个流创建job,为了不同job间互不影响,需在foreachRDD、transform算子内进行变量的广播操作此方法仅适用于spark streaming,structured streaming需使用其他方法做广播变量的更新 /** * 定时更新广播变量,数据从jdbc数据源读取 */ @Slf4j public class JDBCBroadcastPeriodicUpdater { private static final int PERIOD = 30 * 1000; //更新周期,秒 private static volatile JDBCBroadcastPeriodicUpdater instance; private Broadcast broadcast; private long lastUpdate = 0L; private JDBCBroadcastPeriodicUpdater() {} public static JDBCBroadcastPeriodicUpdater getInstance() { if (instance == null) { synchronized (JDBCBroadcastPeriodicUpdater.class) { if (instance == null) { instance = new JDBCBroadcastPeriodicUpdater(); } } } return instance; } /** * 更新广播变量 */ public Broadcast updateAndGet(SparkSession spark, DruidDataSource dataSource, String sql) { SparkContext sc = spark.sparkContext(); long now = System.currentTimeMillis(); long offset = now - lastUpdate; if (offset > PERIOD || null == broadcast) { if (broadcast != null) { // 删除已获取的广播变量值 broadcast.unpersist(); } lastUpdate = now; // 重新广播新的变量值 List value = fetchBroadcastData(dataSource, sql); broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value); } return broadcast; } /** * 获取需要广播的数据 */ @SneakyThrows private List fetchBroadcastData(DruidDataSource dataSource, String sql) { List result = new ArrayList(); DruidPooledConnection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql); ResultSet data = ps.executeQuery(); ResultSetMetaData metaData = data.getMetaData(); int colCount = metaData.getColumnCount(); while (data.next()) { HashMap row = new HashMap(); for (int i=0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3